package main

import (
	"fmt"
	"log"
	"net/http"

	"golang.org/x/net/websocket"

	"github.com/nsqio/go-nsq"
)

var wss *websocket.Conn
var proddd *nsq.Producer

// nsq订阅消息
type ConsumerT struct{}

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	if wss != nil {
		err := websocket.Message.Send(wss, string(msg.Body))
		if err != nil {
			log.Fatal(err)
		}
	}
	return nil
}

func _initConsumer(addrAndPort, channel, topic string) {
	c, err := nsq.NewConsumer(topic, channel, nsq.NewConfig()) // 新建一个消费者
	if err != nil {
		panic(err)
	}
	c.AddHandler(&ConsumerT{}) // 添加消息处理
	//	if errA := c.ConnectToNSQLookupd("http://127.0.0.1:4161"); errA != nil {
	if err := c.ConnectToNSQD(addrAndPort); err != nil { // 建立连接
		panic(err)
	}
}

// 新建生产者
func _initProducer(addrAndPort string) {
	var err error
	proddd, err = nsq.NewProducer(addrAndPort, nsq.NewConfig())
	if err != nil {
		panic(err)
	}
}

// nsq发布消息
func Publish(topic, msg string) {
	var err error
	// 发布消息
	if err = proddd.Publish(topic, []byte(msg)); err != nil {
		panic(err)
	}
	if err != nil {
		log.Fatal(err)
	}
}

func main() {
	_initProducer("127.0.0.1:4150")
	_initConsumer("127.0.0.1:4150", "test-channel", "test")

	http.Handle("/ws", websocket.Handler(Echo))
	http.Handle("/send", http.HandlerFunc(userSendHandler))
	if err := http.ListenAndServe("localhost:10000", nil); err != nil {
		log.Fatal("ListenAndServe:", err)
	}
}

func userSendHandler(res http.ResponseWriter, req *http.Request) {
	Publish("test", req.URL.String())
	res.Write([]byte("OK"))
}

func Echo(ws *websocket.Conn) {
	var err error
	wss = ws

	for {
		var reply string
		if err = websocket.Message.Receive(ws, &reply); err != nil {
			fmt.Println("Can't receive")
			return
		}
		if reply != "" {
			Publish("test", reply)
		}
	}
}
